Behavioral Patterns
# What does this print?
class EventBus:
def __init__(self):
self._listeners = {}
def subscribe(self, event, listener):
self._listeners.setdefault(event, []).append(listener)
def publish(self, event, data):
for listener in self._listeners.get(event, []):
listener(data)
class UserService:
def __init__(self, bus: EventBus):
self._bus = bus
def register(self, email: str):
print(f"Registered {email}")
self._bus.publish("user.registered", {"email": email})
class EmailService:
def on_user_registered(self, data):
print(f"Sending welcome email to {data['email']}")
bus = EventBus()
email_svc = EmailService()
user_svc = UserService(bus)
bus.subscribe("user.registered", email_svc.on_user_registered)
# Now - what if we lose the reference?
email_svc = None # ← reassign the variable
import gc; gc.collect()
# Is the listener still called?
# OUTPUT: "Registered [email protected]"
# "Sending welcome email to [email protected]" ← STILL FIRES
The surprise: the listener is still called after email_svc = None. The EventBus holds a strong reference to the bound method email_svc.on_user_registered, which keeps the entire EmailService object alive. In a long-running FastAPI process, every subscribe() call that is never balanced by an unsubscribe() is a memory leak. The fix is weak references - and understanding why they are essential is the thread that connects all Observer / Event System implementations in Python.
What You Will Learn
- Observer / Event System - publisher-subscriber with strong vs weak references, async observers, and event buses in FastAPI
- Strategy - interchangeable algorithms using
Protocolfor duck-typed strategies instead of ABC - Command - encapsulating actions as objects with dataclasses, undo/redo, and retry queues
- Chain of Responsibility - middleware pipelines, mapping to FastAPI's middleware stack
- State - finite-state machines using both
enum.Enum+ transition tables and class-per-state - Template Method - abstract base classes for algorithmic skeletons
- Iterator - Python's
__iter__/__next__protocol andyield-based generators - Mediator - decoupling many-to-many object relationships
- 5 interview Q&A with senior-level answers
Prerequisites
- Comfortable with Python classes,
__dunder__methods,abc.ABC,dataclasses - Basic asyncio (
async def,await,asyncio.gather) - Familiar with structural patterns (Module 1, Lesson 02)
- Python 3.10+
Why Behavioral Patterns?
Behavioral patterns are about communication - how objects talk to each other, share responsibilities, and encapsulate algorithms. Where structural patterns answer "how do I connect these?", behavioral patterns answer "how do these collaborate without becoming entangled?"
| Pattern | Core idea | The problem it fixes |
|---|---|---|
| Observer | Notify many dependents automatically | Tight coupling between event source and handlers |
| Strategy | Swap algorithms at runtime | Algorithm selection scattered in if/else |
| Command | Encapsulate actions as objects | No undo, no queuing, no audit log |
| Chain of Responsibility | Pass request through a handler pipeline | Hardcoded cross-cutting concerns |
| State | Object behaviour changes with internal state | Giant if/elif blocks on status fields |
| Template Method | Skeleton algorithm, steps overridable | Copy-paste of algorithm structure |
| Iterator | Uniform traversal interface | Exposing internal data structures |
| Mediator | Central hub for many-to-many communication | N² direct dependencies |
Part 1 - Observer / Event System
Strong References - The Memory Leak
The opening surprise showed that a naive event bus keeps listeners alive forever. Let us quantify the problem:
import weakref
import gc
import sys
class LeakyEventBus:
def __init__(self):
self._listeners: dict[str, list] = {}
def subscribe(self, event: str, listener) -> None:
self._listeners.setdefault(event, []).append(listener) # strong ref
def publish(self, event: str, data: object) -> None:
for listener in self._listeners.get(event, []):
listener(data)
class HeavyService:
def __init__(self, name: str):
self.name = name
self._data = [0] * 100_000 # 800 KB of "important state"
def handle(self, event):
pass
bus = LeakyEventBus()
# Simulate registering and "destroying" 100 services
for i in range(100):
svc = HeavyService(f"service-{i}")
bus.subscribe("tick", svc.handle)
svc = None # we think we destroyed it
gc.collect()
print(f"Listeners still registered: {len(bus._listeners['tick'])}") # 100 - all leaked
Weak Reference Event Bus - The Fix
import weakref
from typing import Callable, Any
class WeakEventBus:
"""
Event bus that holds weak references to listeners.
When the listener object is garbage-collected, it is automatically
removed from the bus - no explicit unsubscribe needed.
"""
def __init__(self) -> None:
# Map event_name → list of weak references
self._listeners: dict[str, list[weakref.ref]] = {}
def subscribe(self, event: str, listener: Callable) -> None:
if event not in self._listeners:
self._listeners[event] = []
# Bound methods need weakref.WeakMethod, not weakref.ref
if hasattr(listener, "__self__"):
ref = weakref.WeakMethod(listener)
else:
ref = weakref.ref(listener)
self._listeners[event].append(ref)
def publish(self, event: str, data: Any = None) -> None:
if event not in self._listeners:
return
alive: list[weakref.ref] = []
for ref in self._listeners[event]:
listener = ref() # dereference
if listener is not None:
listener(data)
alive.append(ref)
# If None: object was GC'd - silently skip and do not re-add
self._listeners[event] = alive # compact dead references
def listener_count(self, event: str) -> int:
return len(self._listeners.get(event, []))
# ── Demonstrate automatic cleanup ─────────────────────────────────────────────
bus = WeakEventBus()
class AnalyticsService:
def on_user_registered(self, data):
print(f"[Analytics] Recording registration: {data['email']}")
analytics = AnalyticsService()
bus.subscribe("user.registered", analytics.on_user_registered)
print(f"Listeners before GC: {bus.listener_count('user.registered')}") # 1
# Destroy the service
analytics = None
gc.collect()
print(f"Listeners after GC: {bus.listener_count('user.registered')}") # 0
Full Production Event Bus
from __future__ import annotations
import weakref
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from collections import defaultdict
logger = logging.getLogger(__name__)
@dataclass
class Event:
name: str
data: Any = None
source: str = "unknown"
EventHandler = Callable[[Event], None]
class EventBus:
"""
Production-ready synchronous event bus with:
- Weak references to avoid memory leaks
- Error isolation (one bad handler does not stop others)
- Wildcard subscriptions ("*" receives all events)
- Handler priority ordering
"""
def __init__(self) -> None:
self._listeners: dict[str, list[tuple[int, weakref.ref]]] = defaultdict(list)
def subscribe(
self,
event_name: str,
handler: EventHandler,
priority: int = 0,
) -> None:
"""
Subscribe to an event. Higher priority handlers run first.
Weak reference is held - keep a strong reference to your handler object.
"""
if hasattr(handler, "__self__"):
ref: weakref.ref = weakref.WeakMethod(handler)
else:
ref = weakref.ref(handler)
self._listeners[event_name].append((priority, ref))
self._listeners[event_name].sort(key=lambda x: -x[0]) # sort descending
def unsubscribe(self, event_name: str, handler: EventHandler) -> None:
if event_name not in self._listeners:
return
self._listeners[event_name] = [
(pri, ref)
for pri, ref in self._listeners[event_name]
if ref() is not None and ref() != handler
]
def publish(self, event: Event) -> int:
"""
Publish an event. Returns number of handlers successfully called.
Errors in individual handlers are logged but do not propagate.
"""
handlers_called = 0
for event_name in (event.name, "*"):
alive = []
for priority, ref in self._listeners.get(event_name, []):
handler = ref()
if handler is None:
continue # GC'd - drop silently
alive.append((priority, ref))
try:
handler(event)
handlers_called += 1
except Exception as exc:
logger.error(
"Handler %r failed for event %r: %s",
handler,
event.name,
exc,
exc_info=True,
)
self._listeners[event_name] = alive # compact
return handlers_called
# ── Wire up a FastAPI-style application ───────────────────────────────────────
class EmailService:
def on_user_registered(self, event: Event) -> None:
email = event.data["email"]
print(f"[Email] Sending welcome email to {email}")
class AnalyticsService:
def on_user_registered(self, event: Event) -> None:
email = event.data["email"]
print(f"[Analytics] Tracking registration: {email}")
class CRMService:
def on_any_event(self, event: Event) -> None:
print(f"[CRM] Received event: {event.name} from {event.source}")
bus = EventBus()
email_svc = EmailService()
analytics_svc = AnalyticsService()
crm_svc = CRMService()
bus.subscribe("user.registered", email_svc.on_user_registered, priority=10)
bus.subscribe("user.registered", analytics_svc.on_user_registered, priority=5)
bus.subscribe("*", crm_svc.on_any_event, priority=1)
called = bus.publish(event)
print(f"\nHandlers called: {called}")
Async Observer - asyncio-compatible
In FastAPI or any async Python service, synchronous observers block the event loop. Here is an async-compatible version:
from __future__ import annotations
import asyncio
from typing import Callable, Awaitable, Any
AsyncHandler = Callable[..., Awaitable[None]]
class AsyncEventBus:
"""Event bus where all handlers are coroutines."""
def __init__(self) -> None:
self._handlers: dict[str, list[AsyncHandler]] = {}
def subscribe(self, event: str, handler: AsyncHandler) -> None:
self._handlers.setdefault(event, []).append(handler)
async def publish(self, event: str, data: Any = None) -> None:
"""Run all handlers concurrently, gather results."""
handlers = self._handlers.get(event, [])
if not handlers:
return
await asyncio.gather(
*(handler(data) for handler in handlers),
return_exceptions=True, # isolate failures
)
async def send_welcome_email(data: dict) -> None:
await asyncio.sleep(0.01) # simulate network I/O
print(f"[Email] Welcome sent to {data['email']}")
async def log_to_analytics(data: dict) -> None:
await asyncio.sleep(0.005)
print(f"[Analytics] Logged {data['email']}")
async def main():
bus = AsyncEventBus()
bus.subscribe("user.registered", send_welcome_email)
bus.subscribe("user.registered", log_to_analytics)
# All handlers run concurrently - total latency ≈ max(individual latencies)
asyncio.run(main())
FastAPI Integration Pattern
# In a FastAPI app: use lifespan to create the bus and share it
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
_bus: AsyncEventBus | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _bus
_bus = AsyncEventBus()
# Wire up handlers
_bus.subscribe("user.registered", send_welcome_email)
_bus.subscribe("user.registered", log_to_analytics)
yield
_bus = None
app = FastAPI(lifespan=lifespan)
def get_bus() -> AsyncEventBus:
assert _bus is not None
return _bus
@app.post("/users")
async def create_user(email: str, bus: AsyncEventBus = Depends(get_bus)):
# ... create user in DB ...
await bus.publish("user.registered", {"email": email})
return {"status": "created"}
Part 2 - Strategy
The Problem
A content classification system needs to route text through different classifiers depending on context:
def classify(text: str, mode: str) -> str:
if mode == "rules":
if "invoice" in text.lower():
return "billing"
elif "error" in text.lower():
return "support"
return "general"
elif mode == "ml":
# call a local sklearn model
pass
elif mode == "llm":
# call an LLM API
pass
else:
raise ValueError(f"Unknown mode: {mode}")
This function grows with every new classifier. It cannot be tested in isolation. Swapping classifiers at runtime requires string matching. The open/closed principle is violated at every addition.
Strategy with Protocol - Duck-Typed Strategies
Using Protocol instead of ABC means any callable or class with the right shape is automatically a strategy - no inheritance required. This is the Pythonic approach for strategies.
from __future__ import annotations
from typing import Protocol, runtime_checkable
# ── Strategy interface - Protocol, not ABC ────────────────────────────────────
@runtime_checkable
class ClassificationStrategy(Protocol):
"""
Any object with a `classify(text) -> str` method is a valid strategy.
No inheritance needed.
"""
def classify(self, text: str) -> str:
...
# ── Concrete strategies ───────────────────────────────────────────────────────
class RuleBasedClassifier:
"""Fast, deterministic, no ML infrastructure needed."""
def __init__(self, rules: dict[str, list[str]]) -> None:
self._rules = rules # {category: [keyword, ...]}
def classify(self, text: str) -> str:
lower = text.lower()
for category, keywords in self._rules.items():
if any(kw in lower for kw in keywords):
return category
return "general"
class MLClassifier:
"""sklearn-based classifier - pre-trained model loaded at init."""
def __init__(self, model_path: str) -> None:
print(f"[ML] Loading model from {model_path}")
# Real: self._model = joblib.load(model_path)
self._model_path = model_path
def classify(self, text: str) -> str:
# Real: return self._model.predict([text])[0]
return "ml-predicted-category"
class LLMClassifier:
"""Uses an LLM for nuanced classification. Slow but accurate."""
def __init__(self, client, model: str = "gpt-4o") -> None:
self._client = client
self._model = model
def classify(self, text: str) -> str:
prompt = (
f"Classify the following text into one of: billing, support, general, spam.\n"
f"Text: {text}\nRespond with only the category name."
)
# Real: response = self._client.complete(prompt)
return "support" # simulated
# Lambda as a strategy - works because Protocol is structural
trivial_classifier: ClassificationStrategy = type(
"Trivial", (), {"classify": lambda self, text: "general"}
)()
# Verify duck typing works
print(isinstance(trivial_classifier, ClassificationStrategy)) # True
# ── Context - uses the strategy ───────────────────────────────────────────────
class DocumentRouter:
"""Context that holds and uses a ClassificationStrategy."""
def __init__(self, strategy: ClassificationStrategy) -> None:
self._strategy = strategy
def set_strategy(self, strategy: ClassificationStrategy) -> None:
"""Strategy can be swapped at runtime."""
self._strategy = strategy
def route(self, text: str) -> dict:
category = self._strategy.classify(text)
return {"text": text[:50], "category": category, "strategy": type(self._strategy).__name__}
# ── Usage ─────────────────────────────────────────────────────────────────────
rules = {
"billing": ["invoice", "payment", "charge", "refund"],
"support": ["error", "broken", "help", "issue", "bug"],
"spam": ["click here", "free money", "winner"],
}
router = DocumentRouter(RuleBasedClassifier(rules))
print(router.route("I need help with an error on my invoice"))
# Swap to ML when confidence is needed
router.set_strategy(MLClassifier("/models/doc_classifier_v3.pkl"))
print(router.route("The widget is not functioning as expected"))
Strategy Selection at Runtime
from typing import Mapping
class StrategyRegistry:
"""
Replaces if/elif chains for strategy selection with a lookup table.
"""
def __init__(self) -> None:
self._registry: dict[str, ClassificationStrategy] = {}
def register(self, name: str, strategy: ClassificationStrategy) -> None:
self._registry[name] = strategy
def get(self, name: str) -> ClassificationStrategy:
try:
return self._registry[name]
except KeyError:
raise ValueError(f"Unknown strategy: {name!r}. Available: {list(self._registry)}")
registry = StrategyRegistry()
registry.register("rules", RuleBasedClassifier(rules))
registry.register("ml", MLClassifier("/models/v3.pkl"))
# Now strategy selection is data-driven, not code-driven
for config in [{"mode": "rules"}, {"mode": "ml"}]:
strategy = registry.get(config["mode"])
router.set_strategy(strategy)
print(router.route("My payment failed with an error"))
Protocol vs ABC for Strategies
| Dimension | Protocol | ABC |
|---|---|---|
| Inheritance required | No - structural / duck typing | Yes - must inherit |
| Third-party classes | Work automatically if they have the right shape | Must be explicitly registered (register()) |
| Static type checking | Full support via mypy | Full support via mypy |
| Runtime check | Requires @runtime_checkable | isinstance works natively |
| Pythonic idiom | Preferred for "anything with this signature" | Preferred when you want to enforce shared implementation |
Part 3 - Command
The Problem
A document editor needs to:
- Wrap all user actions as reversible operations (undo/redo)
- Queue and retry failed operations (network save)
- Maintain an audit log of every action
None of these is possible if operations are implemented as direct method calls that return immediately.
Commands as Dataclasses
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
# ── Command interface ─────────────────────────────────────────────────────────
class Command(ABC):
@abstractmethod
def execute(self) -> Any:
...
@abstractmethod
def undo(self) -> None:
...
@property
@abstractmethod
def description(self) -> str:
...
# ── Document model ────────────────────────────────────────────────────────────
class Document:
def __init__(self, content: str = "") -> None:
self.content = content
def __repr__(self) -> str:
preview = self.content[:40].replace("\n", "\\n")
return f"Document({preview!r})"
# ── Concrete commands - dataclass makes them serialisable ─────────────────────
@dataclass
class InsertTextCommand(Command):
document: Document
position: int
text: str
_timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc), init=False)
def execute(self) -> None:
self.document.content = (
self.document.content[:self.position]
+ self.text
+ self.document.content[self.position:]
)
def undo(self) -> None:
self.document.content = (
self.document.content[:self.position]
+ self.document.content[self.position + len(self.text):]
)
@property
def description(self) -> str:
return f"Insert {len(self.text)} chars at position {self.position}"
@dataclass
class DeleteTextCommand(Command):
document: Document
start: int
end: int
_deleted_text: str = field(default="", init=False)
def execute(self) -> None:
self._deleted_text = self.document.content[self.start:self.end]
self.document.content = (
self.document.content[:self.start]
+ self.document.content[self.end:]
)
def undo(self) -> None:
self.document.content = (
self.document.content[:self.start]
+ self._deleted_text
+ self.document.content[self.start:]
)
@property
def description(self) -> str:
return f"Delete chars {self.start}–{self.end}"
@dataclass
class ReplaceTextCommand(Command):
document: Document
start: int
end: int
new_text: str
_original_text: str = field(default="", init=False)
def execute(self) -> None:
self._original_text = self.document.content[self.start:self.end]
self.document.content = (
self.document.content[:self.start]
+ self.new_text
+ self.document.content[self.end:]
)
def undo(self) -> None:
self.document.content = (
self.document.content[:self.start]
+ self._original_text
+ self.document.content[self.start + len(self.new_text):]
)
@property
def description(self) -> str:
return f"Replace chars {self.start}–{self.end} with {self.new_text!r}"
# ── Invoker - maintains history ───────────────────────────────────────────────
class CommandHistory:
"""
Invoker: executes commands and maintains undo/redo stacks.
The invoker has no knowledge of what commands do.
"""
def __init__(self, max_history: int = 100) -> None:
self._done: list[Command] = []
self._undone: list[Command] = []
self._max_history = max_history
def execute(self, command: Command) -> Any:
result = command.execute()
self._done.append(command)
self._undone.clear() # branching history: redo stack invalidated on new command
if len(self._done) > self._max_history:
self._done.pop(0)
print(f" ✓ {command.description}")
return result
def undo(self) -> bool:
if not self._done:
print(" Nothing to undo")
return False
command = self._done.pop()
command.undo()
self._undone.append(command)
print(f" ↩ Undid: {command.description}")
return True
def redo(self) -> bool:
if not self._undone:
print(" Nothing to redo")
return False
command = self._undone.pop()
command.execute()
self._done.append(command)
print(f" ↪ Redid: {command.description}")
return True
def history(self) -> list[str]:
return [cmd.description for cmd in self._done]
# ── Demo ──────────────────────────────────────────────────────────────────────
doc = Document()
history = CommandHistory()
history.execute(InsertTextCommand(doc, 0, "Hello, World!"))
history.execute(InsertTextCommand(doc, 13, " How are you?"))
history.execute(ReplaceTextCommand(doc, 0, 5, "Hi"))
print(f"\nDocument: {doc.content!r}")
history.undo()
history.undo()
print(f"After 2 undos: {doc.content!r}")
history.redo()
print(f"After 1 redo: {doc.content!r}")
Task Queue with Retry
Commands are particularly powerful for async task queues - the command object encapsulates what to do and the queue decides when and how many times:
from __future__ import annotations
import time
from dataclasses import dataclass, field
from collections import deque
from typing import Any
@dataclass
class RetryableCommand:
command: Command
max_attempts: int = 3
backoff_base: float = 0.1
attempt: int = field(default=0, init=False)
last_error: Exception | None = field(default=None, init=False)
def try_execute(self) -> tuple[bool, Any]:
self.attempt += 1
try:
result = self.command.execute()
return True, result
except Exception as exc:
self.last_error = exc
return False, None
@property
def exhausted(self) -> bool:
return self.attempt >= self.max_attempts
@property
def next_wait(self) -> float:
return self.backoff_base * (2 ** (self.attempt - 1))
class RetryQueue:
def __init__(self) -> None:
self._queue: deque[RetryableCommand] = deque()
self._failed: list[RetryableCommand] = []
def enqueue(self, command: Command, max_attempts: int = 3) -> None:
self._queue.append(RetryableCommand(command, max_attempts=max_attempts))
def process_all(self) -> dict[str, int]:
stats = {"succeeded": 0, "failed": 0, "retried": 0}
retry_queue: deque[RetryableCommand] = deque()
while self._queue:
rc = self._queue.popleft()
success, _ = rc.try_execute()
if success:
stats["succeeded"] += 1
print(f" [OK] {rc.command.description}")
elif rc.exhausted:
stats["failed"] += 1
self._failed.append(rc)
print(f" [FAILED] {rc.command.description}: {rc.last_error}")
else:
stats["retried"] += 1
print(f" [RETRY in {rc.next_wait:.2f}s] {rc.command.description}")
time.sleep(rc.next_wait)
retry_queue.append(rc)
self._queue = retry_queue
return stats
Part 4 - Chain of Responsibility
The Problem
An HTTP request must pass through multiple independent processing stages: authentication, rate limiting, request validation, and then actual processing. Hardcoding them in sequence creates a monolith and makes it impossible to reorder, add, or remove stages without touching the core handler.
The Solution - Handler Pipeline
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Request:
method: str
path: str
headers: dict[str, str] = field(default_factory=dict)
body: dict = field(default_factory=dict)
user_id: Optional[str] = None
metadata: dict = field(default_factory=dict)
@dataclass
class Response:
status: int
body: str
# ── Handler interface ─────────────────────────────────────────────────────────
class Handler(ABC):
def __init__(self) -> None:
self._next: Optional[Handler] = None
def set_next(self, handler: "Handler") -> "Handler":
"""Returns the handler for fluent chaining: a.set_next(b).set_next(c)"""
self._next = handler
return handler
def handle(self, request: Request) -> Response:
if self._next is not None:
return self._next.handle(request)
# End of chain: unhandled
return Response(500, "No handler processed the request")
@property
def name(self) -> str:
return type(self).__name__
# ── Concrete handlers ─────────────────────────────────────────────────────────
class AuthHandler(Handler):
_VALID_TOKENS = {"Bearer secret-token-alice": "alice", "Bearer secret-token-bob": "bob"}
def handle(self, request: Request) -> Response:
token = request.headers.get("Authorization", "")
user_id = self._VALID_TOKENS.get(token)
if not user_id:
print(f"[{self.name}] Rejected: invalid token")
return Response(401, "Unauthorized")
request.user_id = user_id
print(f"[{self.name}] Authenticated as {user_id}")
return super().handle(request)
class RateLimitHandler(Handler):
def __init__(self, max_per_minute: int = 60) -> None:
super().__init__()
self._max = max_per_minute
self._counts: dict[str, int] = {}
def handle(self, request: Request) -> Response:
user = request.user_id or "anonymous"
self._counts[user] = self._counts.get(user, 0) + 1
if self._counts[user] > self._max:
print(f"[{self.name}] Rate limit exceeded for {user}")
return Response(429, "Too Many Requests")
print(f"[{self.name}] OK ({self._counts[user]}/{self._max} requests for {user})")
return super().handle(request)
class ValidationHandler(Handler):
def __init__(self, required_fields: list[str]) -> None:
super().__init__()
self._required = required_fields
def handle(self, request: Request) -> Response:
missing = [f for f in self._required if f not in request.body]
if missing:
print(f"[{self.name}] Missing fields: {missing}")
return Response(400, f"Missing required fields: {missing}")
print(f"[{self.name}] All required fields present")
return super().handle(request)
class ProcessingHandler(Handler):
"""The actual business logic - only reached if all guards passed."""
def handle(self, request: Request) -> Response:
print(f"[{self.name}] Processing request for user {request.user_id}")
return Response(200, f"Processed successfully for {request.user_id}")
# ── Build the chain ───────────────────────────────────────────────────────────
def build_pipeline() -> Handler:
auth = AuthHandler()
rate_limit = RateLimitHandler(max_per_minute=60)
validation = ValidationHandler(required_fields=["course_id"])
processing = ProcessingHandler()
# Fluent chaining
auth.set_next(rate_limit).set_next(validation).set_next(processing)
return auth # caller only holds the head
pipeline = build_pipeline()
print("=== Request 1: Valid ===")
response = pipeline.handle(Request(
method="POST",
path="/enroll",
headers={"Authorization": "Bearer secret-token-alice"},
body={"course_id": "python-foundation"},
))
print(f"Response: {response.status} {response.body}\n")
print("=== Request 2: Missing auth ===")
response = pipeline.handle(Request(
method="POST",
path="/enroll",
headers={},
body={"course_id": "python-foundation"},
))
print(f"Response: {response.status} {response.body}\n")
print("=== Request 3: Missing body field ===")
response = pipeline.handle(Request(
method="POST",
path="/enroll",
headers={"Authorization": "Bearer secret-token-bob"},
body={},
))
print(f"Response: {response.status} {response.body}\n")
Functional Pipeline - Using __call__
For simpler use cases, a functional approach using callables composes cleanly:
from typing import Callable
Middleware = Callable[[Request], Response | None]
def compose_pipeline(*middlewares: Middleware, final: Callable[[Request], Response]) -> Callable[[Request], Response]:
"""
Compose middlewares into a pipeline.
Each middleware returns None to pass to next, or a Response to short-circuit.
"""
def pipeline(request: Request) -> Response:
for middleware in middlewares:
result = middleware(request)
if result is not None:
return result
return final(request)
return pipeline
def auth_middleware(request: Request) -> Response | None:
if "Authorization" not in request.headers:
return Response(401, "Unauthorized")
request.user_id = "authenticated-user"
return None # pass through
def logging_middleware(request: Request) -> Response | None:
print(f"[LOG] {request.method} {request.path} user={request.user_id}")
return None # pass through
def handle_request(request: Request) -> Response:
return Response(200, "OK")
handler = compose_pipeline(auth_middleware, logging_middleware, final=handle_request)
How FastAPI Middleware Maps to Chain of Responsibility
FastAPI's middleware system is a direct implementation of Chain of Responsibility:
from fastapi import FastAPI, Request as FastAPIRequest
from fastapi.responses import JSONResponse
import time
app = FastAPI()
@app.middleware("http")
async def timing_middleware(request: FastAPIRequest, call_next):
"""Each middleware receives the request and a `call_next` - the rest of the chain."""
start = time.monotonic()
response = await call_next(request) # ← pass to next handler
duration = time.monotonic() - start
response.headers["X-Process-Time"] = f"{duration:.4f}"
return response
@app.middleware("http")
async def auth_middleware_fastapi(request: FastAPIRequest, call_next):
token = request.headers.get("Authorization", "")
if not token.startswith("Bearer "):
return JSONResponse({"detail": "Unauthorized"}, status_code=401)
return await call_next(request)
Part 5 - State
The Problem
An order goes through a lifecycle: pending → paid → shipped → delivered → returned. Without the State pattern:
class Order:
def pay(self):
if self.status == "pending":
self.status = "paid"
elif self.status == "paid":
raise ValueError("Already paid")
elif self.status == "shipped":
raise ValueError("Cannot pay after shipment")
# ... more if/elif for every status × method combination
With 6 states and 5 methods, you have 30 branches, all in one class. Every new state or transition touches this class.
Solution A - Enum + Transition Table (Preferred for Simple FSMs)
from __future__ import annotations
from enum import Enum, auto
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
class OrderStatus(Enum):
PENDING = auto()
PAID = auto()
SHIPPED = auto()
DELIVERED = auto()
CANCELLED = auto()
RETURNED = auto()
# Transition table: (current_state, action) → new_state
# Missing entries are invalid transitions
TRANSITIONS: dict[tuple[OrderStatus, str], OrderStatus] = {
(OrderStatus.PENDING, "pay"): OrderStatus.PAID,
(OrderStatus.PENDING, "cancel"): OrderStatus.CANCELLED,
(OrderStatus.PAID, "ship"): OrderStatus.SHIPPED,
(OrderStatus.PAID, "cancel"): OrderStatus.CANCELLED,
(OrderStatus.SHIPPED, "deliver"):OrderStatus.DELIVERED,
(OrderStatus.DELIVERED, "return"): OrderStatus.RETURNED,
}
@dataclass
class OrderEvent:
action: str
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
metadata: dict = field(default_factory=dict)
class Order:
def __init__(self, order_id: str) -> None:
self.order_id = order_id
self._status = OrderStatus.PENDING
self._history: list[OrderEvent] = []
@property
def status(self) -> OrderStatus:
return self._status
def transition(self, action: str, **metadata) -> None:
key = (self._status, action)
new_status = TRANSITIONS.get(key)
if new_status is None:
raise ValueError(
f"Invalid transition: cannot '{action}' an order in state {self._status.name}"
)
old_status = self._status
self._status = new_status
event = OrderEvent(action=action, metadata={"from": old_status.name, "to": new_status.name, **metadata})
self._history.append(event)
print(f"[Order {self.order_id}] {old_status.name} → {new_status.name}")
def pay(self, payment_id: str = "") -> None:
self.transition("pay", payment_id=payment_id)
def ship(self, tracking_number: str = "") -> None:
self.transition("ship", tracking=tracking_number)
def deliver(self) -> None:
self.transition("deliver")
def cancel(self, reason: str = "") -> None:
self.transition("cancel", reason=reason)
def return_order(self, reason: str = "") -> None:
self.transition("return", reason=reason)
def history(self) -> list[str]:
return [f"{e.action}: {e.metadata}" for e in self._history]
# ── Demo ──────────────────────────────────────────────────────────────────────
order = Order("ORD-001")
order.pay(payment_id="pay_abc123")
order.ship(tracking_number="TRACK-9999")
order.deliver()
try:
order.pay() # invalid transition
except ValueError as e:
print(f"Error: {e}")
print("\nOrder history:")
for entry in order.history():
print(f" {entry}")
Solution B - Class-per-State (Preferred for Complex State-Specific Logic)
When each state has significant unique logic (not just transitions), a class per state avoids giant if/elif blocks:
from __future__ import annotations
from abc import ABC, abstractmethod
class OrderState(ABC):
"""Each state knows its own valid transitions."""
@abstractmethod
def pay(self, order: "OrderFSM") -> None: ...
@abstractmethod
def ship(self, order: "OrderFSM") -> None: ...
@abstractmethod
def deliver(self, order: "OrderFSM") -> None: ...
@abstractmethod
def cancel(self, order: "OrderFSM") -> None: ...
def _invalid(self, action: str) -> None:
raise ValueError(
f"Cannot '{action}' in state {type(self).__name__}"
)
def __repr__(self) -> str:
return type(self).__name__
class PendingState(OrderState):
def pay(self, order: "OrderFSM") -> None:
print("[State] Pending → Paid: processing payment")
order._state = PaidState()
def ship(self, order: "OrderFSM") -> None:
self._invalid("ship")
def deliver(self, order: "OrderFSM") -> None:
self._invalid("deliver")
def cancel(self, order: "OrderFSM") -> None:
print("[State] Pending → Cancelled: cancelling order")
order._state = CancelledState()
class PaidState(OrderState):
def pay(self, order: "OrderFSM") -> None:
self._invalid("pay") # already paid
def ship(self, order: "OrderFSM") -> None:
print("[State] Paid → Shipped: dispatching to courier")
order._state = ShippedState()
def deliver(self, order: "OrderFSM") -> None:
self._invalid("deliver")
def cancel(self, order: "OrderFSM") -> None:
print("[State] Paid → Cancelled: issuing refund")
order._state = CancelledState()
class ShippedState(OrderState):
def pay(self, order: "OrderFSM") -> None:
self._invalid("pay")
def ship(self, order: "OrderFSM") -> None:
self._invalid("ship") # already shipped
def deliver(self, order: "OrderFSM") -> None:
print("[State] Shipped → Delivered: marking as delivered")
order._state = DeliveredState()
def cancel(self, order: "OrderFSM") -> None:
self._invalid("cancel") # too late to cancel
class DeliveredState(OrderState):
def pay(self, order: "OrderFSM") -> None:
self._invalid("pay")
def ship(self, order: "OrderFSM") -> None:
self._invalid("ship")
def deliver(self, order: "OrderFSM") -> None:
self._invalid("deliver") # already delivered
def cancel(self, order: "OrderFSM") -> None:
self._invalid("cancel")
class CancelledState(OrderState):
def pay(self, order: "OrderFSM") -> None:
self._invalid("pay")
def ship(self, order: "OrderFSM") -> None:
self._invalid("ship")
def deliver(self, order: "OrderFSM") -> None:
self._invalid("deliver")
def cancel(self, order: "OrderFSM") -> None:
self._invalid("cancel") # already cancelled
class OrderFSM:
def __init__(self, order_id: str) -> None:
self.order_id = order_id
self._state: OrderState = PendingState()
@property
def state(self) -> str:
return repr(self._state)
def pay(self) -> None:
self._state.pay(self)
def ship(self) -> None:
self._state.ship(self)
def deliver(self) -> None:
self._state.deliver(self)
def cancel(self) -> None:
self._state.cancel(self)
order = OrderFSM("ORD-002")
print(f"State: {order.state}")
order.pay()
order.ship()
order.deliver()
try:
order.cancel()
except ValueError as e:
print(f"Error: {e}")
Choosing Between Approaches
| Criterion | Enum + Transition Table | Class-per-State |
|---|---|---|
| Number of states | < 10, simple logic | 10+, complex per-state logic |
| Transition logic | Uniform | State-specific side effects |
| Adding new states | Edit the transition dict | Add a new class |
| Readability | All transitions visible at a glance | Logic scattered across classes |
| Testing | Test the table | Test each state class |
Part 6 - Template Method
The Core Idea
Template Method defines the skeleton of an algorithm in a base class and lets subclasses override specific steps. Python's ABC and abstractmethod are the natural implementation:
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass
class TrainingResult:
model_name: str
train_loss: float
val_loss: float
epochs: int
class ModelTrainer(ABC):
"""
Template Method: defines the training loop skeleton.
Subclasses override individual steps.
"""
def train(self, config: dict) -> TrainingResult:
"""The template method - final, orchestrates all steps."""
print(f"[{self.__class__.__name__}] Starting training")
# Fixed skeleton - same for all subclasses
data = self._load_data(config["data_path"])
model = self._build_model(config)
data = self._preprocess(data)
train_loss, val_loss = self._run_training_loop(model, data, config)
self._save_model(model, config.get("output_path", "/tmp/model"))
return TrainingResult(
model_name=config.get("model_name", "unnamed"),
train_loss=train_loss,
val_loss=val_loss,
epochs=config.get("epochs", 1),
)
# ── Abstract steps - must be implemented ─────────────────────────────────
@abstractmethod
def _load_data(self, path: str) -> Any: ...
@abstractmethod
def _build_model(self, config: dict) -> Any: ...
@abstractmethod
def _run_training_loop(self, model: Any, data: Any, config: dict) -> tuple[float, float]: ...
# ── Hook - optional override with default implementation ─────────────────
def _preprocess(self, data: Any) -> Any:
"""Hook: subclasses may override for custom preprocessing."""
return data
def _save_model(self, model: Any, path: str) -> None:
"""Hook: default save behaviour."""
print(f" Saving model to {path}")
class BERTTrainer(ModelTrainer):
def _load_data(self, path: str) -> Any:
print(f" [BERT] Loading tokenised dataset from {path}")
return {"input_ids": [], "labels": []}
def _build_model(self, config: dict) -> Any:
print(f" [BERT] Building BERT model with config {config}")
return "bert_model_object"
def _preprocess(self, data: Any) -> Any:
print(" [BERT] Applying BERT-specific preprocessing")
return data
def _run_training_loop(self, model, data, config) -> tuple[float, float]:
epochs = config.get("epochs", 3)
print(f" [BERT] Training for {epochs} epochs")
return 0.243, 0.281 # simulated losses
class XGBoostTrainer(ModelTrainer):
def _load_data(self, path: str) -> Any:
print(f" [XGB] Loading tabular data from {path}")
return {"X": [], "y": []}
def _build_model(self, config: dict) -> Any:
print(f" [XGB] Building XGBoost model")
return "xgb_model_object"
def _run_training_loop(self, model, data, config) -> tuple[float, float]:
print(" [XGB] Running boosting rounds")
return 0.12, 0.15
for trainer_cls, config in [
(BERTTrainer, {"data_path": "/data/reviews", "model_name": "bert-finetuned", "epochs": 3}),
(XGBoostTrainer, {"data_path": "/data/tabular", "model_name": "xgb-v2"}),
]:
result = trainer_cls().train(config)
print(f"\nResult: {result}\n")
Abstract methods vs Hooks: abstract methods must be overridden (training loop); hooks have default implementations and are optionally overridden (preprocessing, saving).
Part 7 - Iterator
Python's Iterator Protocol
Python's iterator protocol is a first-class language feature: any object with __iter__ returning self and __next__ raising StopIteration when exhausted is iterable. Generators make this trivially easy.
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterator, Generic, TypeVar
T = TypeVar("T")
class BoundedBuffer(Generic[T]):
"""
A fixed-size circular buffer with a proper iterator.
Shows __iter__ + __next__ from scratch, and the same via a generator.
"""
def __init__(self, capacity: int) -> None:
self._capacity = capacity
self._data: list[T] = []
self._read_pos = 0
def put(self, item: T) -> None:
if len(self._data) >= self._capacity:
raise BufferError("Buffer is full")
self._data.append(item)
def __len__(self) -> int:
return len(self._data)
# ── Full iterator object ──────────────────────────────────────────────────
def __iter__(self) -> "BufferIterator[T]":
return BufferIterator(self._data[:]) # snapshot
# ── Alternative: generator-based (much simpler) ───────────────────────────
def items(self) -> Iterator[T]:
"""Generator: yields one item at a time without materialising a list."""
yield from self._data
class BufferIterator(Generic[T]):
def __init__(self, data: list[T]) -> None:
self._data = data
self._pos = 0
def __iter__(self) -> "BufferIterator[T]":
return self
def __next__(self) -> T:
if self._pos >= len(self._data):
raise StopIteration
item = self._data[self._pos]
self._pos += 1
return item
buf: BoundedBuffer[int] = BoundedBuffer(capacity=5)
for i in range(5):
buf.put(i * 10)
# Works in for loops, list comprehensions, zip, etc.
print(list(buf)) # [0, 10, 20, 30, 40]
print([x for x in buf if x > 15]) # [20, 30, 40]
print(list(zip(buf, ["a", "b", "c"]))) # [(0, 'a'), (10, 'b'), (20, 'c')]
Lazy Tree Iterator
Generators make recursive iteration elegant - no need to materialise the entire tree:
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterator
@dataclass
class TreeNode:
value: int
children: list["TreeNode"] = field(default_factory=list)
def add(self, child: "TreeNode") -> "TreeNode":
self.children.append(child)
return self
def depth_first(self) -> Iterator[int]:
"""Lazy depth-first traversal - no recursion stack materialised."""
yield self.value
for child in self.children:
yield from child.depth_first() # recursive yield
def breadth_first(self) -> Iterator[int]:
"""Lazy BFS via a local deque."""
from collections import deque
queue: deque["TreeNode"] = deque([self])
while queue:
node = queue.popleft()
yield node.value
queue.extend(node.children)
def __iter__(self) -> Iterator[int]:
return self.depth_first()
root = TreeNode(1)
root.add(TreeNode(2).add(TreeNode(4)).add(TreeNode(5)))
root.add(TreeNode(3).add(TreeNode(6)))
print(f"DFS: {list(root.depth_first())}") # [1, 2, 4, 5, 3, 6]
print(f"BFS: {list(root.breadth_first())}") # [1, 2, 3, 4, 5, 6]
Part 8 - Mediator
The Problem
In a real-time collaboration system, components need to communicate with each other: a chat panel, a presence indicator, a notification banner, and a message log. Without a mediator, each component holds direct references to every other - N components create N² bidirectional dependencies.
The Solution - Central Hub
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
class ChatMediator(ABC):
@abstractmethod
def send_message(self, sender: "Participant", message: str) -> None: ...
@abstractmethod
def join(self, participant: "Participant") -> None: ...
@abstractmethod
def leave(self, participant: "Participant") -> None: ...
class Participant(ABC):
def __init__(self, name: str, mediator: ChatMediator) -> None:
self.name = name
self._mediator = mediator
def send(self, message: str) -> None:
self._mediator.send_message(self, message)
@abstractmethod
def receive(self, sender_name: str, message: str) -> None: ...
class ChatRoom(ChatMediator):
"""Mediator - the only component that knows about all participants."""
def __init__(self, name: str) -> None:
self._name = name
self._participants: dict[str, Participant] = {}
self._history: list[str] = []
def join(self, participant: Participant) -> None:
self._participants[participant.name] = participant
self._broadcast(f"*** {participant.name} joined the room ***", system=True)
def leave(self, participant: Participant) -> None:
self._participants.pop(participant.name, None)
self._broadcast(f"*** {participant.name} left the room ***", system=True)
def send_message(self, sender: Participant, message: str) -> None:
formatted = f"[{sender.name}]: {message}"
self._history.append(formatted)
self._broadcast(formatted, exclude=sender.name)
def _broadcast(self, text: str, exclude: str = "", system: bool = False) -> None:
for name, participant in self._participants.items():
if name != exclude:
participant.receive("System" if system else "", text)
class HumanUser(Participant):
def receive(self, sender_name: str, message: str) -> None:
print(f" [{self.name} sees] {message}")
class BotUser(Participant):
def receive(self, sender_name: str, message: str) -> None:
if "help" in message.lower():
self.send("I can help! Type /faq for common questions.")
# Usage
room = ChatRoom("general")
alice = HumanUser("Alice", room)
bob = HumanUser("Bob", room)
helper_bot = BotUser("HelperBot", room)
room.join(alice)
room.join(bob)
room.join(helper_bot)
alice.send("Hello everyone!")
bob.send("I need some help with the course")
Behavioral Patterns - Quick Comparison
| Pattern | "I reach for this when..." | Key Python mechanism |
|---|---|---|
| Observer | Multiple objects react to one object's state changes | Weak references, asyncio.gather |
| Strategy | I need interchangeable algorithms | Protocol for duck-typed strategies |
| Command | I need undo, queuing, or audit logging | dataclass-based command objects |
| Chain of Responsibility | I have a pipeline of independent guards | Linked handlers or compose with __call__ |
| State | Behaviour changes based on internal state | Enum + table, or class-per-state |
| Template Method | Subclasses share algorithm structure, differ in steps | ABC with abstractmethod + hooks |
| Iterator | I need lazy, reusable traversal of a collection | __iter__ / __next__, or yield |
| Mediator | N components need to communicate without N² coupling | Central hub with join/broadcast |
Interview Q&A
Q1: In the Observer pattern, why do Python event buses need weak references, and when can you skip them?
Python's garbage collector cannot collect an object if any live reference points to it. A standard event bus that stores listener bound methods holds a strong reference to the method, which in turn holds a strong reference to the object (method.__self__). If the listener object is "logically" destroyed by removing the variable that pointed to it, the bus still keeps it alive, causing a memory leak. The fix is weakref.WeakMethod for bound methods and weakref.ref for plain functions. During publish, dead references (dereferenced to None) are silently dropped. You can skip weak references only in cases where the event bus and all its listeners have the same lifetime - for example, all wired up at application startup and torn down together. In a web server where handlers are created per request or per feature module, weak references are mandatory.
Q2: When should you use Protocol for the Strategy pattern instead of ABC?
Use Protocol when you want structural subtyping - the ability for any class that has the right method signatures to serve as a strategy, without inheriting from a base class. This is valuable when strategies might come from third-party libraries that you cannot modify, when you want lambda or simple function objects to work as strategies, or when teams prefer duck typing. Use ABC when you want to share implementation across strategies (concrete methods in the base), enforce a strict is-a hierarchy with isinstance checks, or communicate a strong contract that subclasses must explicitly agree to. In practice, Protocol is more Pythonic for "swappable algorithm" use cases; ABC is better when the strategies share meaningful logic through inheritance.
Q3: Explain the difference between Chain of Responsibility and Decorator pattern. They both chain things - how are they different?
Both pass a request through a series of objects, but the intent and structure differ. In the Decorator pattern, every handler in the chain processes the request and the chain always runs to completion - the purpose is to add behaviour (logging, caching, retry) around a core operation. Each decorator enriches the result. In Chain of Responsibility, any handler can short-circuit the chain by returning a response without calling the next handler - the purpose is to find the right handler for a request or to enforce a pipeline of guards where any guard can reject the request early. In FastAPI's middleware stack, each middleware (Decorator intent) always calls call_next unless it actively rejects the request (Chain intent). Production middleware stacks often blend both intents.
Q4: Describe two ways to implement a State machine in Python and when you would choose each.
The first approach is an enum + transition table: define all states as an Enum, and all valid transitions as a dict mapping (current_state, action) to new_state. This is excellent for simple FSMs with straightforward transitions, because all transitions are visible in one place and it is easy to visualise the graph. Adding a new state is a matter of adding enum values and dict entries. The second approach is class-per-state: each state is a class that implements a common State interface, and each state class defines what happens for each event, including transitioning to the next state. This is better when each state has significant, unique logic that would otherwise be nested inside the transition table or the main context class. The class-per-state approach also makes individual states easier to test in isolation. For most applications with fewer than ten states and simple transition logic, the enum + table approach wins on clarity.
Q5: What is the Template Method pattern and how does it relate to the Hollywood Principle?
Template Method defines the skeleton of an algorithm in a base class method and lets subclasses override specific steps - but not the algorithm's overall structure. The base class calls the overridable steps; subclasses do not call the base class. This is precisely the Hollywood Principle: "don't call us, we'll call you." The base class is in control; it calls into the subclass when needed. This is the opposite of the Strategy pattern: in Strategy, the context calls the strategy; in Template Method, the base class (the framework) calls the subclass (the user's code). In Python, this is expressed with ABC + abstractmethod for mandatory steps and regular methods for hooks (optional overrides with defaults). A real-world example is any training loop framework: the framework defines the training skeleton (train()) and calls user-defined steps like _build_model() and _run_training_loop().
